实战 | 使用 Kotlin Flow 构建数据流 "管道"
△ 使用 Kotlin Flow 构建数据流 "管道"
Bilibili 视频链接
https://www.bilibili.com/video/BV1db4y1p7nF/
单向数据流
△ 加载数据流的过程
每款 Android 应用都需要以某种方式收发数据,比如从数据库获取用户名、从服务器加载文档,以及对用户进行身份验证等。接下来,我们将介绍如何将数据加载到 Flow,然后经过转换后暴露给视图进行展示。
为了大家更方便地理解 Flow,我们以 Pancho (潘乔) 的故事来展开。当住在山上的 Pancho 想从湖中获取淡水时,会像大多数新手一开始一样,拿个水桶走到湖边取水,然后再走回来。
响应式编程
更好的方式则是让数据只在一个方向上流动,并创建一些基础设施 (像 Pancho 铺设管道那样) 来组合和转换这些数据流,这些管道可以随着状态的变化而修改,比如在用户退出登录时重新安装管道。
△ 单向数据绑定
使用 Flow
可以想象对于这些组合和转换来说,我们需要一个成熟的工具来完成这些操作。在本文中我们将使用 Kotlin Flow 来实现。Flow 并不是唯一的数据流构建器,不过得益于它是协程的一部分并且得到了很好的支持。我们刚才一直用作比喻的水流,在协程库里称之为 Flow 类型,我们用泛形 T 来指代数据流承载的用户数据或者页面状态等任何类型。
△ 生产者和消费者
生产者会将数据 emit (发送) 到数据流中,而消费者则从数据流中 collect (收集) 这些数据。在 Android 中数据源或存储区通常是应用数据的生产者;消费者则是视图,它会把数据显示在屏幕上。
大多数情况下您都无需自行创建数据流,因为数据源中依赖的库,例如 DataStore、Retrofit、Room 或 WorkManager 等常见的库都已经与协程及 Flow 集成在一起了。这些库就像是水坝,它们使用 Flow 来提供数据,您无需了解数据是如何生成的,只需 "接入管道" 即可。
△ 提供 Flow 支持的库
@DAO
interface CodelabsDAO {
@Query("SELECT * FROM codelabs")
fun getAllCodelabs(): Flow<List<Codelab>>
}
创建 Flow
如果您要自己创建数据流,有一些方案可供选择,比如数据流构建器。假设我们处于 UserMessagesDataSource 中,当您希望频繁地在应用内检查新消息时,可以将用户消息暴露为消息列表类型的数据流。我们使用数据流构建器来创建数据流,因为 Flow 是在协程上下文环境中运行的,它以挂起代码块作为参数,这也意味着它能够调用挂起函数,我们可以在代码块中使用 while(true) 来循环执行我们的逻辑。
在示例代码中,我们首先从 API 获取消息,然后使用 emit 挂起函数将结果添加到 Flow 中,这将挂起协程直到收集器接收到数据项,最后我们将协程挂起一段时间。在 Flow 中,操作会在同一个协程中顺序执行,使用 while(true) 循环可以让 Flow 持续获取新消息直到观察者停止收集数据。传递给数据流构建器的挂起代码块通常被称为 "生产者代码块"。
class UserMessagesDataSource(
private val messagesApi: MessagesApi,
private val refreshIntervalMs: Long = 5000
) {
val latestMessages: Floa<List<Message>> = flow {
white(true) {
val userMessages = messagesApi.fetchLatestMessages()
emit(userMessages) // 将结果发送给 Flow
delay(refreshIntervalMs) // ⏰ 挂起一段时间
}
}
}
转换 Flow
在 Android 中,生产者和消费者之间的层可以使用中间运算符修改数据流来适应下一层的要求。
在本例中,我们将 latestMessages 流作为数据流的起点,则可以使用 map 运算符将数据转换为不同的类型,例如我们可以使用 map lambda 表达式将来自数据源的原始消息转换为 MessagesUiModel,这一操作可以更好地抽象当前层级,每个运算符都应根据其功能创建一个新的 Flow 来发送数据。我们还可以使用 filter 运算符过滤数据流来获得包含重要通知的数据流。而 catch 运算符则可以捕获上游数据流中发生的异常,上游数据流是指在生产者代码块和当前运算符之间调用的运算符产生的数据流,而在当前运算符之后生成的数据流则被称为下游数据流。catch 运算符还可以在有需要的时候再次抛出异常或者发送新值,我们在示例代码中可以看到其在捕获到 IllegalArgumentExceptions 时将其重新抛出,并且在发生其他异常时发送一个空列表:
val importantUserMessages: Flow<MessageUiModel> =
userMessageDataSource.latestMessages
.map { userMessage ->
userMessages.toUiModel()
}
.filter { messageUiModel ->
messagesUiModel.containsImportantNotifications()
}
.catch { e ->
analytics.log("Error loading reserved event")
if (e is IllegalArgumentException) throw e
else emit(emptyList())
}
收集 Flow
现在我们已经了解过如何生成和修改数据流,接下来了解一下如何收集数据流。收集数据流通常发生在视图层,因为这是我们想要在屏幕上显示数据的地方。
在本例中,我们希望列表中能够显示最新消息以便 Pancho 能够了解最新动态。我们可以使用终端运算符 collect 来监听数据流发送的所有值,collect 接收一个函数作为参数,每个新值都会调用该参数,并且由于它是一个挂起函数,因此需要在协程中执行。
userMessages.collect { messages ->
listAdapter.submitList(messages)
}
在 Flow 中使用终端运算符将按需创建数据流并开始发送值,而相反的是中间操作符只是设置了一个操作链,其会在数据被发送到数据流时延迟执行。每次对 userMessages 调用 collect 时都会创建一个新的数据流,其生产者代码块将根据自己的时间间隔开始刷新来自 API 的消息。在协程中我们将这种按需创建并且只有在被观察时才会发送数据的数据流称之为冷流 (Cold Stream)。
在 Android 视图上收集数据流
在 Android 的视图中收集数据流要注意两点,第一是在后台运行时不应浪费资源,第二是配置变更。
安全收集
假设我们在 MessagesActivity 中,如果希望在屏幕上显示消息列表,则应该当界面没有显示在屏幕上时停止收集,就像是 Pancho 在刷牙或者睡觉时应该关上水龙头一样。我们有多种具有生命周期感知能力的方案,来实现当信息不在屏幕上展示就不从数据流中收集信息的功能,比如 androidx.lifecycle:lifecycle-runtime-ktx 包中的 Lifecycle.repeatOnLifecycle(state) 和 Flow<T>.flowWithLifecycle(lifecycle, state)。您还可以在 ViewModel 中使用 androidx.lifecycle:lifecycle-livedata-ktx 包里的 Flow<T>.asLiveData(): LiveData 将数据流转换为 LiveData,这样就可以像往常一样使用 LiveData 来实现这件事情。不过为了简单起见,这里推荐使用 repeatOnLifecycle 从界面层收集数据流。
repeatOnLifecycle 是一个接收 Lifecycle.State 作为参数的挂起函数,该 API 具有生命周期感知能力,所以能够在当生命周期进入响应状态时自动使用传递给它的代码块启动新的协程,并且在生命周期离开该状态时取消该协程。在上面的例子中,我们使用了 Activity 的 lifecycleScope 来启动协程,由于 repeatOnLifecycle 是挂起函数,所以它需要在协程中被调用。最佳实践是在生命周期初始化时调用该函数,就像上面的例子中我们在 Activity 的 onCreate 中调用一样:
import androidx.lifecycle.repeatOnLifecycle
class MessagesActivity : AppCompatActivity() {
val viewModel: MessagesViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED)
viewModel.userMessages.collect { messages ->
listAdapter.submitList(messages)
}
}
// 协程将会在 lifecycle 进入 DESTROYED 后被恢复
}
}
}
repeatOnLifecycle 的可重启行为充分考虑了界面的生命周期,不过需要注意的是,直到生命周期进入 DESTROYED,调用 repeatOnLifecycle 的协程都不会恢复执行,因此如果您需要从多个数据流中进行收集,则应在 repeatOnLifecycle 代码块内多次使用 launch 来创建协程:
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
launch {
viewModel.userMessages.collect { … }
}
launch {
otherFlow.collect { … }
}
}
}
如果只需从一个数据流中进行收集,则可使用 flowWithLifecycle 来收集数据,它能够在生命周期进入目标状态时发送数据,并在离开目标状态时取消内部的生产者:
lifecycleScope.launch {
viewModel.userMessages
.flowWithLifecycle(lifecycle, State.STARTED)
.collect { messages ->
listAdapter.submitList(messages)
}
}
为了能够直观地展示具体的运作过程,我们来探索一下此 Activity 的生命周期,首先是创建完成并向用户可见;接下来用户按下了主屏幕按钮将应用退到后台,此时 Activity 会收到 onStop 信号;当重新打开应用时又会调用 onStart。如果您调用 repeatOnLifecycle 并传入 STARTED 状态,界面就只会在屏幕上显示时收集数据流发出的信号,并且在应用转到后台时取消收集。
△ Activity 的生命周期
class MessagesActivity : AppCompatActivity() {
val viewModel: MessagesViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
// ❌ 危险的操作
lifecycleScope.launch {
viewModel.userMessage.collect { messages ->
listAdapter.submitList(messages)
}
}
// ❌ 危险的操作
LifecycleCoroutineScope.launchWhenX {
flow.collect { … }
}
}
}
配置变更
当您向视图暴露数据流时,必须要考虑到您正在尝试在具有不同生命周期的两个元素之间传递数据,并不是所有生命周期都会出现问题,但在 Activity 和 Fragment 的生命周期里会比较棘手。当设备旋转或者接收到配置变更时,所有的 Activity 都可能会重启但 ViewModel 却能被保留,因此您不能把任意数据流都简单地从 ViewModel 中暴露出来。
△ 旋转屏幕会重建 Activity 但能够保留 ViewModel
以如下代码中的冷流为例,由于每次收集冷流时它都会重启,所以在设备旋转之后会再次调用 repository.fetchItem()。我们需要某种缓冲区机制来保障无论重新收集多少次都可以保持数据,并在多个收集器之间共享数据,而 StateFlow 正是为了此用途而设计的。在我们的湖泊比喻中,StateFlow 就好比水箱,即使没有收集器它也能持有数据。因为它可以多次被收集,所以能够放心地将其与 Activity 或 Fragment 一起使用。
val result: Flow<Result<UiState>> = flow {
emit(repository.fetchItem())
}
private val _myUiState = MutableStateFlow<MyUiState>()
val myUiState: StateFlow<MyUiState> = _myUiState
init {
viewModelScope.launch {
_muUiState.value = Result.Loading
_myUiState.value = repository.fetchStuff()
}
}
val result: StateFlow<Result<UiState>> = someFlow
.stateIn(
initialValue = Result.Loading
scope = viewModelScope,
started = WhileSubscribed(5000),
)
我们可以通过设置超时时间来正确判断不同的场景,当停止收集 StateFlow 时,不会立即停止所有上游数据流,而是会等待一段时间,如果在超时前再次收集数据则不会取消上游数据流,这就是 WhileSubscribed(5000) 的作用。当设置了超时时间后,如果按下主屏幕按钮会让视图立即结束收集,但 StateFlow 会经过我们设置的超时时间之后才会停止其上游数据流,如果用户再次打开应用则会自动重启上游数据流。而在旋转场景中视图只停止了很短的时间,无论如何都不会超过 5 秒钟,因此 StateFlow 并不会重启,所有的上游数据流都将会保持在活跃状态,就像什么都没有发生一样可以做到即时向用户呈现旋转后的屏幕。
https://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow
测试数据流
class MyFakeRepository : MyRepository {
fun observeCount() = flow {
emit(ITEM_1)
}
}
如果受测单元暴露一个数据流,并且您希望验证该值或一系列值,那么您可以通过多种方式收集它们。您可以对数据流调用 first() 方法以进行收集并在接收到第一个数据项后停止收集。您还可以调用 take(5) 并使用 toList 终端操作符来收集恰好 5 条消息,这种方法可能非常有帮助。
@Test
fun myTest() = runBlocking {
// 收集第一个数据然后停止收集
val firstItem = repository.counter.first()
// 收集恰好 5 条消息
val first = repository.messages.take(5).toList()
}
回顾
感谢阅读本文,希望您通过本文内容已经了解到为什么响应式架构值得投资,以及如何使用 Kotlin Flow 构建您的基础设施。文末提供了有关这方面的资料,包括涵盖基础知识的指南以及深入探讨某些主题的文章。另外您还可以通过 Google I/O 应用了解这些内容的详细信息,我们在早些时候为其更新了很多有关数据流的内容。
指南: Android 上的 Kotlin 数据流
https://developer.android.google.cn/kotlin/flow 使用更为安全的方式收集 Android UI 数据流 从 LiveData 迁移到 Kotlin 数据流 Flow 操作符 shareIn 和 stateIn 使用须知 设计 repeatOnLifecycle API 背后的故事 示例代码: Google I/O 应用 https://github.com/google/iosched
推荐阅读